Create spark session
In [94]:
import sys, glob, os
SPARK_HOME = "/Users/abulbasar/Downloads/spark-2.3.1-bin-hadoop2.7"
sys.path.append(SPARK_HOME + "/python")
sys.path.append(glob.glob(SPARK_HOME + "/python/lib/py4j*.zip")[0])
from pyspark.sql import SparkSession, functions as F
spark = (SparkSession
.builder
.config("spark.master", "local[*]")
.config("spark.driver.memory", "4G")
.config("spark.sql.shuffle.partitions", 16)
.getOrCreate())
sc = spark.sparkContext
print(sc.uiWebUrl)
sql = spark.sql
Create utility function to load data into a spark dataframe
In [95]:
import re
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
base_dir = "/data/movielens/"
def cache_df(df, name):
df.createOrReplaceTempView(name)
spark.catalog.cacheTable(name)
def load(file):
name = re.sub("[^A-Za-z0-9]", "_", file[:-4])
df = spark.read.option("header", True).option("inferSchema", True).csv(base_dir + file)
cache_df(df, name)
df.alias(name)
return df
Load data
In [96]:
movies = load("movies.csv")
ratings = load("ratings.csv")
sql("show tables").show()
Take a look at movies dataset. It shows by default first 20 records.
In [97]:
movies.show()
View the ratings data.
In [98]:
ratings.show()
How many rating levels are there? Find count by each rating level
In [99]:
ratings.groupBy("rating").count().toPandas().sort_values("rating")
Out[99]:
Find top 10 movies based on the highest avg rating. Each top movies must have at least 100 ratings to make the avg to reliable.
In [124]:
top10 = (ratings
.groupBy("movieId")
.agg(F.avg("rating").alias("avg_rating"), F.count("*").alias("rating_count"))
.filter("rating_count > 100")
.join(movies, on = "movieId")
.select("movieId", "avg_rating", "title")
.orderBy(F.desc("avg_rating"))
.limit(10)
)
top10.show()
Does the rating pattern changes from user to user? For example, one user tends to give higher rating than other? Let's take a look at top 100 most rated movies, and group rating by users to find their median rating
In [131]:
(ratings
.join(top10, on = "movieId")
.groupBy("userId")
.agg(F.avg("rating").alias("avg_rating"))
).toPandas()["avg_rating"].plot.hist(bins = 50)
Out[131]:
We notice event for the that can be considered good movies in generated has a wide range of perception among the viewers.
Change the fraction rating values to intger to reduce the number of rating.
In [100]:
ratings = ratings.withColumn("rating", F.expr("cast(rating as int)"))
ratings.groupBy("rating").count().toPandas().sort_values("rating")
Out[100]:
Find the distribution of rating by plotting histogram.
In [101]:
ratings.groupBy("userId").agg(F.avg("rating").alias("avg_rating")).toPandas().avg_rating.plot.hist(bins = 50)
Out[101]:
How the average rating have changed over years.
In [102]:
avg_rating_by_year = (ratings
.withColumn("year", F.expr("year(from_unixtime(timestamp))"))
.groupBy("year")
.agg(F.avg("rating").alias("avg_rating"))
).toPandas()
avg_rating_by_year = avg_rating_by_year.sort_values("year")
avg_rating_by_year.index = avg_rating_by_year.year
avg_rating_by_year.avg_rating.plot()
plt.ylabel("Avg Rating")
Out[102]:
Find number of unique users.
In [103]:
ratings.selectExpr("count(distinct userId)").first()
Out[103]:
Find number of unique movieIds.
In [104]:
ratings.selectExpr("count(distinct movieId)").first()
Out[104]:
Does all the movie id mentioned in rating present in the movies table as well? Expected count from the statement below should be 0, if all values are present.
In [105]:
ratings.join(movies, on = ["movieId"], how = "leftanti").show()
What is distribution of the number of rating by each movie
In [106]:
ratings.groupBy("movieId").count().select("count").describe().show()
Find the average rating by each user and rating count.
In [107]:
avg_rating = ratings.groupBy("userId").agg(F.count("*").alias("count"), F.avg("rating").alias("avg_rating"))
avg_rating.show()
Which movies are more consistent rating from users. We can take standard deviation as measure of consistency.
In [108]:
(ratings
.groupBy("movieId")
.agg(F.avg("rating")
, F.stddev("rating").alias("std")
, F.count("*").alias("count"))
.filter("not isnan(std)")
.orderBy(F.desc("std"))
).show()
Devide the dataset into training and test
In [16]:
df_train, df_test = (ratings.randomSplit([0.7, 0.3], seed = 1))
cache_df(df_train, "df_train")
Apply matrix factorization using ALS (alternate least square) from mllib
In [17]:
from pyspark.ml.recommendation import ALS
In [109]:
als = ALS(rank=10, maxIter=5, seed=0,
userCol= "userId", itemCol= "movieId", ratingCol="rating")
als_model = als.fit(df_train)
View the iterm factors. These factors are found by ALS model. These represent latent properties of the each movie. Rank or the dimension is 10 as we specied in the ALS model.
In [19]:
als_model.itemFactors.limit(10).toPandas()
Out[19]:
Find all the movies by userId = 100. It is just example user.
In [20]:
ratings.filter("userId = 100").join(movies,
on = "movieId").orderBy(F.desc("rating")).show()
We see movieId = 32 has been one the top rated movie for user = 100. Find out movies similar to this movie = 32. For similarity consine similarity has been used on the iterm vectors.
In [68]:
from pyspark.sql.types import DoubleType
import numpy as np
import scipy
import scipy.spatial
def distance(v1, v2):
v1 = np.array(v1)
v2 = np.array(v2)
return float(scipy.spatial.distance.cosine(v1, v2))
spark.udf.register("distance", distance, DoubleType())
def recommendation_by_i2i(movie_id):
return (als_model
.itemFactors
.filter(F.col("id") == movie_id)
.alias("t1")
.crossJoin(als_model.itemFactors.alias("t2"))
.withColumn("similarity", F.expr("distance(t1.features, t2.features)"))
.join(movies, F.col("t2.id") == F.col("movieId"))
.orderBy(F.asc("similarity"))
.select("movieId", "title", "similarity")
)
recommendation_by_i2i(32).show(10, False)
In [71]:
recommendation_by_i2i(10).show(10, False) # movieId for GoldenEye (1995) is 10
Like item Factors we can see the user factors representing the latent propeties of the user.
In [72]:
als_model.userFactors.limit(10).toPandas()
Out[72]:
Find top movies by user; assume, the top movies are those that the user has rated 4 or 5.
In [82]:
top_rated_movies_by_user = (ratings
.filter("rating = 4 or rating = 5")
.groupBy("userId")
.agg(F.collect_set("movieId").alias("top_movies")))
top_rated_movies_by_user.show()
In [91]:
def recommendation_by_u2u(user_id):
return (als_model
.userFactors
.filter(F.col("id") == user_id)
.alias("t1")
.crossJoin(als_model.itemFactors.alias("t2"))
.withColumn("similarity", F.expr("distance(t1.features, t2.features)"))
.filter("similarity < 0.03") # 0.02 Similarity threshold - a hyper parameter. We can perform tuning to find the suitable value
.join(top_rated_movies_by_user.alias("t3"), F.col("t2.id") == F.col("t3.userId"))
.select("t1.id", F.explode("top_movies").alias("movieId"))
.join(movies, on = "movieId")
.select("movieId", "title")
)
recommendation_by_u2u(100).show(10, False)
In [134]:
(ratings
.join(movies, on = "movieId")
.filter("userId = 100")
.orderBy(F.desc("rating"))
).show()
Let's see what results the simiarity match between the user factors and movies factors give.
In [153]:
def recommendation_by_u2i(userId):
return (als_model
.userFactors.alias("t1")
.filter(F.col("id") == userId)
.crossJoin(als_model.itemFactors.alias("t2"))
.withColumn("similarity", F.expr("distance(t1.features, t2.features)"))
.orderBy(F.asc("similarity"))
.join(movies, F.col("t2.id") == F.col("movieId"))
.select("similarity", "title")
.limit(20)
)
recommendation_by_u2i(100).show(20, False)
Predict the rating for each movie and user combination in the df_test dataset.
In [23]:
df_prediction = als_model.transform(df_test)
df_prediction.show()
Find predictions of rating for user 575 and compare it against the actual rating.
In [24]:
df_prediction.filter("userId = 575").show()
For for some movie prediction is nan. Likely, those movies do not have any records in the df_train.
In [25]:
df_prediction.filter("isnan(prediction)").orderBy("movieId").show()
In [26]:
df_prediction.filter("movieId = 148").show()
In [27]:
df_train.filter("movieId = 148").show()
In [28]:
als_model.itemFactors.filter("id = 148").show()
In [29]:
recommendations_by_user = als_model.recommendForAllUsers(10)
recommendations_by_user.show()
Display the list of movie recommended for the users. If you are interested to see the recommendation for a given user, you can filter the result by userId.
In [30]:
recommendations_by_user_enriched = (recommendations_by_user
.withColumn("recommendation", F.explode("recommendations"))
.withColumn("movieId", F.expr("recommendation.movieId"))
.withColumn("recommended_rating", F.expr("recommendation.rating"))
.drop("recommendations")
.drop("recommendation")
.join(movies, on = "movieId")
)
recommendations_by_user_enriched.show()
In [31]:
recommendations_by_user_enriched.filter("userId = 547").show()
Find most active users, their avg rating and rating standard deviation. We like to see how the list of highly rated (4 or 5 rating) matches up with the recommendation.
In [32]:
rating_count_by_user = (ratings
.groupBy("userId")
.agg(F.avg("rating").alias("avg_rating")
, F.count("*").alias("rating_count")
, F.stddev("rating").alias("std_rating")
)
.orderBy(F.desc("rating_count"))
)
rating_count_by_user.show()
In [33]:
rating_count_by_user.selectExpr("mean(std_rating)").first()
Out[33]:
Plot the distribution (hist) of the number of ratings by users.
In [34]:
rating_count_by_user.select("rating_count").toPandas()["rating_count"].plot.hist(bins = 50)
Out[34]:
See movie recommendations for a given user. See the recommendations for one of most active users.
In [35]:
def reco_for_user(userId):
return (
als_model
.recommendForUserSubset(df_train.filter(F.col("userId") == userId), 500)
.withColumn("recommendation", F.explode("recommendations"))
.withColumn("movieId", F.expr("recommendation.movieId"))
.withColumn("recommended_rating", F.expr("recommendation.rating"))
.drop("recommendations")
.drop("recommendation")
.join(ratings.filter(F.col("userId") == userId), on = "movieId", how = "leftanti")
.join(movies, on = "movieId")
.orderBy(F.desc("recommended_rating"))
)
reco_for_user(547).show()
Find a few users who have moderate number of ratings.
In [36]:
rating_count_by_user.filter("rating_count = 25").show()
In [37]:
reco_for_user(100).show()
One more let's see the recommendations for the user movie combination in the df_test dataset.
In [38]:
als_model.transform(df_test).show()
Find root mean squared error based on the df_test
In [39]:
(als_model
.transform(df_test)
.filter("not isnan(prediction)")
.selectExpr("sqrt(avg(pow((rating - prediction), 2))) test_rmse")
).show()
Find the rmse for the training data. It is expected, the rmse for the training dataset would be better than the test dataset. Lower the value of RMSE, better it is.
In [40]:
(als_model
.transform(df_train)
.filter("not isnan(prediction)")
.selectExpr("sqrt(avg(pow((rating - prediction), 2))) train_rmse")
).show()
Idea is that based on the features of movies and users, can we estimated what will be the rating.
Example of features of the movies
Features by users
Let create a dataset that we can filter by genre.
In [41]:
genre_rating = (movies
.withColumn("year", F.regexp_extract("title", r"(\d+)", 1))
.withColumn("genres", F.split("genres", "\|"))
.withColumn("genre", F.explode("genres"))
.join(ratings, on = "movieId")
.withColumn("timestamp", F.expr("from_unixtime(timestamp)"))
)
genre_rating.show()
Number of unique values of the genres
In [42]:
genre_rating.selectExpr("count(distinct genre)").first()
Out[42]:
In [43]:
avg_rating_by_genre = genre_rating.groupBy("genre").avg("rating").toPandas()
avg_rating_by_genre.index = avg_rating_by_genre["genre"]
avg_rating_by_genre["avg(rating)"].plot.bar()
Out[43]:
Enrich the movies dataset by computing the avg rating, rating count and number of unique user who has rated high.
In [44]:
movie_genre_stats = (genre_rating
.withColumn("rating_avg", F.expr("avg(rating) over (partition by genre)"))
.withColumn("rating_count", F.expr("count(*) over (partition by genre)"))
.withColumn("rating_count", F.log("rating_count"))
.withColumn("rating_std", F.expr("stddev(rating) over (partition by genre)"))
.withColumn("unique_user_count_rank", F.expr("dense_rank() over (partition by genre order by userId)"))
.withColumn("unique_user_count", F.expr("max(unique_user_count_rank) over (partition by genre)"))
.withColumn("unique_user_count", F.log("unique_user_count"))
.withColumn("high_rating", F.expr("sum(if(rating = 4 or rating = 5, 1, 0)) over (partition by genre)"))
.withColumn("high_rating", F.expr("log(high_rating)"))
.drop("unique_user_count_rank")
)
movie_genre_stats.limit(10).toPandas()
Out[44]:
Since each movie has multiple genres, it is a good idea to calculated IDF scores weighted by the individual measures like avg rating. To calculate IDF scores, we need to pass the data through a few steps => StringIndexer => Group by movie ID => Vectorizer => IDF transformer.
StringIndexer replaces the each string value of the genre with a corresponding index.
Grouping aggregate genre level stat for each userId by packing them into a set column. Look at the below schem for for example.
Vectorizes - Create a sparse vector that is required by TF-IDF.
In [45]:
from pyspark.ml.feature import StringIndexer, IDF
from pyspark.ml.pipeline import Pipeline
In [92]:
genre_indexer = StringIndexer(inputCol="genre", outputCol="genre_index")
stats_by_genre_indexed = genre_indexer.fit(movie_genre_stats).transform(movie_genre_stats)
stats_by_genre_indexed.limit(5).toPandas()
Out[92]:
In [47]:
stats_by_genre_structed = (stats_by_genre_indexed
.withColumn("rating_count_by_genre", F.struct("genre_index", F.col("rating_count").alias("value")))
.withColumn("rating_avg_by_genre", F.struct("genre_index", F.col("rating_avg").alias("value")))
.withColumn("rating_std_by_genre", F.struct("genre_index", F.col("rating_std").alias("value")))
.withColumn("unique_user_count_by_genre", F.struct("genre_index", F.col("unique_user_count").alias("value")))
.withColumn("high_rating_by_genre", F.struct("genre_index", F.col("high_rating").alias("value")))
)
stats_by_genre_structed.limit(5).toPandas()
Out[47]:
In [48]:
stats_by_genre_grouped_by_user = (stats_by_genre_structed
.groupBy("userId")
.agg(
F.collect_set("rating_count_by_genre").alias("rating_count_by_genre")
, F.collect_set("rating_avg_by_genre").alias("rating_avg_by_genre")
, F.collect_set("rating_std_by_genre").alias("rating_std_by_genre")
, F.collect_set("unique_user_count_by_genre").alias("unique_user_count_by_genre")
, F.collect_set("high_rating_by_genre").alias("high_rating_by_genre")
))
stats_by_genre_grouped_by_user.limit(5).toPandas()
Out[48]:
In [49]:
stats_by_genre_grouped_by_user.printSchema()
In [93]:
from pyspark.ml.linalg import SparseVector, VectorUDT
def to_vector(array_of_structs, size = 20):
if array_of_structs is not None:
array_of_structs = sorted(array_of_structs, key = lambda t: t.genre_index)
indices = [s.genre_index for s in array_of_structs]
values = [s.value for s in array_of_structs]
return SparseVector(size, indices, values)
return SparseVector(size, [], [])
spark.udf.register("to_vector", to_vector, VectorUDT())
stats_by_genre_grouped_by_user_vec = (stats_by_genre_grouped_by_user
.withColumn("rating_count_by_genre_vec", F.expr("to_vector(rating_count_by_genre)"))
.withColumn("rating_avg_by_genre_vec", F.expr("to_vector(rating_avg_by_genre)"))
.withColumn("rating_std_by_genre_vec", F.expr("to_vector(rating_std_by_genre)"))
.withColumn("unique_user_count_by_genre_vec", F.expr("to_vector(unique_user_count_by_genre)"))
.withColumn("high_rating_by_genre_vec", F.expr("to_vector(high_rating_by_genre)"))
)
stats_by_genre_grouped_by_user_vec.printSchema()
In [53]:
from pyspark.ml.feature import IDF
from pyspark.ml.pipeline import Pipeline
In [55]:
pipe = Pipeline(stages = [
IDF(inputCol="rating_count_by_genre_vec", outputCol="rating_count_by_genre_idf"),
IDF(inputCol="rating_avg_by_genre_vec", outputCol="rating_avg_by_genre_idf"),
IDF(inputCol= "rating_std_by_genre_vec", outputCol="rating_std_by_genre_idf"),
IDF(inputCol="unique_user_count_by_genre_vec", outputCol="unique_user_by_genre_idf"),
IDF(inputCol="high_rating_by_genre_vec", outputCol="high_rating_by_genre_idf")
])
user_profile_idf = pipe.fit(stats_by_genre_grouped_by_user_vec).transform(stats_by_genre_grouped_by_user_vec)
In [56]:
user_profile_idf.printSchema()
In [57]:
enriched = (ratings
.join(movies, on = "movieId")
.join(user_profile_idf, on = "userId")
.withColumn("rating", F.expr("cast(rating * 2 as int)"))
# .withColumn("year", F.regexp_extract("title", r"\d+", 1))
# .withColumn("age", F.expr("year(from_unixtime(timestamp)) - year"))
)
enriched.printSchema()
In [58]:
from pyspark.ml.feature import VectorAssembler
In [59]:
vector_assembler = VectorAssembler(outputCol="features", inputCols=[
'rating_count_by_genre_idf',
'rating_avg_by_genre_idf',
'rating_std_by_genre_idf',
'unique_user_by_genre_idf',
'high_rating_by_genre_idf'])
enriched_vec = vector_assembler.transform(enriched)
In [60]:
df_train, df_test = enriched_vec.randomSplit([0.7, 0.3], 1)
cache_df(df_train, "df_train")
In [61]:
df_train.printSchema()
In [62]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [154]:
lr = LogisticRegression(family="multinomial", maxIter=10, regParam=0.0
, elasticNetParam=0.8, featuresCol="features", labelCol="rating")
lr_model= lr.fit(df_train)
predictions = lr_model.transform(df_test)
evaluator = MulticlassClassificationEvaluator(
labelCol="rating"
, predictionCol="prediction"
, metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test accuracy = %g " % (accuracy))
In [64]:
df_train.select("rating").distinct().collect()
Out[64]:
In [ ]: